{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Producing and Consuming Messages to/from Kafka, using Python Clients\n",
    "\n",
    "<p style=\"color:red\">Tested with python 3.6 and python 2.7</p>\n",
    "\n",
    "Before running this notebook, you should have created a Kafka topic with a name that you can configure in the `TOPIC_NAME` variable below in the code.\n",
    "\n",
    "The screenshots below illustrates the steps necessary to create a Kafka topic on Hops\n",
    "\n",
    "![kafka.png](./images/kafka.png)\n",
    "![kafka2.png](./images/kafka2.png)\n",
    "![kafka3.png](./images/kafka3.png)\n",
    "![kafka4.png](./images/kafka4.png)\n",
    "![kafka5.png](./images/kafka5.png)\n",
    "![kafka6.png](./images/kafka6.png)\n",
    "![kafka7.png](./images/kafka7.png)\n",
    "![kafka8.png](./images/kafka8.png)\n",
    "\n",
    "In this notebook we use two python dependencies:\n",
    "\n",
    "- [hops-util-py](https://github.com/logicalclocks/hops-util-py)\n",
    "- [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python)\n",
    " \n",
    " To install the `confluent-kafka-python` libary, use the Hopsworks UI:\n",
    " \n",
    "![kafka9.png](./images/kafka9.png)\n",
    "![kafka10.png](./images/kafka10.png)\n",
    " \n",
    "The hops-util library is already installed by default when projects are created on Hops. However, if you need to re-install it for some reason you can use the Hopsworks UI to first uninstall it and the install it from pip using the same method as described above."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>31</td><td>application_1538483294796_0034</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0:8088/proxy/application_1538483294796_0034/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0:8042/node/containerlogs/container_e01_1538483294796_0034_01_000001/KafkaPython__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "from hops import kafka\n",
    "from hops import tls\n",
    "from confluent_kafka import Producer, Consumer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Constants\n",
    "\n",
    "Define the name of the topic you have created here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "TOPIC_NAME = \"test\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can get the schema  defined for the topic by using the utility-library to make a REST-call to Hopsworks:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'contents': '[]', 'version': 0}"
     ]
    }
   ],
   "source": [
    "kafka.get_schema(TOPIC_NAME)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Kafka Config\n",
    "\n",
    "The hops-util-py library provides utility methods for setting up secure communication using Kafka producers and consumers running inside a Hopsworks cluster. You can use this utility methods in combination with any python kafka client. In this noteobook we will be using confluent-kafka-python."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "config = {\n",
    "    \"bootstrap.servers\": kafka.get_broker_endpoints(),\n",
    "    \"security.protocol\": kafka.get_security_protocol(),\n",
    "    \"ssl.ca.location\": tls.get_ca_chain_location(),\n",
    "    \"ssl.certificate.location\": tls.get_client_certificate_location(),\n",
    "    \"ssl.key.location\": tls.get_client_key_location(),\n",
    "    \"group.id\": \"something\"\n",
    "}\n",
    "# equivalently you can use:\n",
    "# config = kafka.get_kafka_default_config()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Kafka Producer and Consumer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "producer = Producer(config)\n",
    "consumer = Consumer(config)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Subscribe the Consumer to your Topic\n",
    "\n",
    "The confluent_kafka api provides a callback-hook for getting notified when a consumer has been assigned to a different Kafka partition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def print_assignment(consumer, partitions):\n",
    "    \"\"\" \n",
    "    Callback called when a Kafka consumer is assigned to a partition\n",
    "    \"\"\"\n",
    "    print('Assignment:', partitions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# the consumer can be subscribed to multiple topics\n",
    "topics = [TOPIC_NAME]\n",
    "consumer.subscribe(topics, on_assign=print_assignment)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Produce Messages to your Topic Using the Producer\n",
    "\n",
    "The confluent_kafka api provides a callback-hook so that we can get notified once messages have been successfully acknowledged by the Kafka brokers (the produce method is asynchronous so when it returns we cannot be guaranteed that messages actually was received by the brokers)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def delivery_callback(err, msg):\n",
    "    \"\"\"\n",
    "    Optional per-message delivery callback (triggered by poll() or flush())\n",
    "    when a message has been successfully delivered or permanently\n",
    "    failed delivery (after retries).\n",
    "    \"\"\"\n",
    "    if err:\n",
    "        print(\"Message failed delivery: {}\".format(err))\n",
    "    else:\n",
    "        print('Message: {} delivered to topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Message: b'message 0' delivered to topic: test, partition: 1, offset: 70, timestamp: (1, 1538566389535)\n",
      "Message: b'message 1' delivered to topic: test, partition: 1, offset: 71, timestamp: (1, 1538566389535)\n",
      "Message: b'message 2' delivered to topic: test, partition: 1, offset: 72, timestamp: (1, 1538566389535)\n",
      "Message: b'message 3' delivered to topic: test, partition: 1, offset: 73, timestamp: (1, 1538566389535)\n",
      "Message: b'message 4' delivered to topic: test, partition: 1, offset: 74, timestamp: (1, 1538566389535)\n",
      "Message: b'message 5' delivered to topic: test, partition: 1, offset: 75, timestamp: (1, 1538566389535)\n",
      "Message: b'message 6' delivered to topic: test, partition: 1, offset: 76, timestamp: (1, 1538566389535)\n",
      "Message: b'message 7' delivered to topic: test, partition: 1, offset: 77, timestamp: (1, 1538566389535)\n",
      "Message: b'message 8' delivered to topic: test, partition: 1, offset: 78, timestamp: (1, 1538566389535)\n",
      "Message: b'message 9' delivered to topic: test, partition: 1, offset: 79, timestamp: (1, 1538566389535)\n",
      "0"
     ]
    }
   ],
   "source": [
    "for i in range(0, 10):\n",
    "    producer.produce(TOPIC_NAME, \"message {}\".format(i), \"key\", callback=delivery_callback)\n",
    "\n",
    "# Trigger the sending of all messages to the brokers, 10sec timeout\n",
    "producer.flush(10) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Poll Messages from your Topic Using the Consumer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Consumed Message: b'message 0' from topic: test, partition: 1, offset: 70, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 1' from topic: test, partition: 1, offset: 71, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 2' from topic: test, partition: 1, offset: 72, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 3' from topic: test, partition: 1, offset: 73, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 4' from topic: test, partition: 1, offset: 74, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 5' from topic: test, partition: 1, offset: 75, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 6' from topic: test, partition: 1, offset: 76, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 7' from topic: test, partition: 1, offset: 77, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 8' from topic: test, partition: 1, offset: 78, timestamp: (1, 1538566389535)\n",
      "Consumed Message: b'message 9' from topic: test, partition: 1, offset: 79, timestamp: (1, 1538566389535)"
     ]
    }
   ],
   "source": [
    "for i in range(0, 10):\n",
    "    msg = consumer.poll(timeout=5.0)\n",
    "    if msg is not None:\n",
    "        print('Consumed Message: {} from topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))\n",
    "    else:\n",
    "        print(\"Topic empty, timeout when trying to consume message, try to produce messages to the topic and then re-consume\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
